This vignette serves as protocol for the processing of event-based rainfall data. The steps below cover imbibing, standardisation, and processing—corresponding code to realise these steps are below. Data processing is done in R ‘ipayipi’. The data were collected using Hobo-data loggers installed in Texas tipping-bucket rain gauges.
R ipayipi (install from GitHub),The overview below shows code needed to execute the steps above. Each of these steps are further described in the following sections of this vignette. For more details and options for each step, see respective function documentation/help files in R. The help files serve as documentation protocol for each function used in ‘ipayipi’.
## 1. load pacakge
library(ipayipi)
## 2. initiate pipeline ----
# define the general working directory: wherever data is going to be processed ...
wd <- "ipayipi_vignettes/rainfall_eg"
# setup pipeline directory structure
pipe_house <- ipip_house(work_dir = wd)
## 3. read & imbibe in data ----
logger_data_import_batch(pipe_house)
imbibe_raw_batch(pipe_house, data_setup = ipayipi::hobo_data)
## 4. standardise data ----
header_sts(pipe_house)
phenomena_sts(pipe_house)
## store standardised data ----
# transfer standardised data to the nomvet_room
transfer_sts_files(pipe_house)
## 5. append standardised data files ----
append_station_batch(pipe_house)
## 6. summarise gaps ----
gap_eval_batch(pipe_house)
## 7. Process data ----
dt_process_batch(pipe_house = pipe_house, pipe_seq = pipe_seq)
This vignette serves as a protocol for processing rainfall data,
largely from Texas tipping-bucket rain gauges using R
ipayipi. ipayipi facilitates a dynamic and
structured pipeline so that processing down a pipeline (and up again),
is traceable.
Texas tipping-bucket rain gauges with hobo-data loggers record rainfall events in a discontinuous or ‘event based’ manner, that is, the tipping bucket of known volume ‘tips’ each time it fills with water from the gauge. Each tip gets tallied as an event so that cumulative rainfall can be recorded. Note that this ‘discontinuous’ event recording contrasts the ‘continuous’ record-interval type associated with many default loggers settings that record a sample at a specific time and set interval, or average of a phenomenon over a set record interval. The ‘discontinuous’ nature of event-based time-series data has important implications for how the data.
The ‘pipe_house’ is the name of the directory structure wherein a data-pipeline is maintained. By initiating the pipeline, a number of directories will be created for ‘housing’ data at various stages of archival and processing, as well as storing scripts, and reports.
To start off, the relative directory where processing is going to done must be defined.
# setting up the 'pipe_house'
## define pipeline working directory 'wd' (must be reletive)
wd <- "ipayipi_vignettes/rainfall_eg"
## initiate pipeline
pipe_house <- ipip_house(work_dir = wd)
print(pipe_house)
## $r
## r
## "ipayipi_vignettes/rainfall_eg/r"
##
## $source_room
## source_room
## "ipayipi_vignettes/rainfall_eg/source_room"
##
## $wait_room
## wait_room
## "ipayipi_vignettes/rainfall_eg/wait_room"
##
## $nomvet_room
## nomvet_room
## "ipayipi_vignettes/rainfall_eg/nomvet_room"
##
## $ipip_room
## ipip_room
## "ipayipi_vignettes/rainfall_eg/ipip_room"
##
## $dta_out
## dta_out
## "ipayipi_vignettes/rainfall_eg/dta_out"
##
## $reports
## reports
## "ipayipi_vignettes/rainfall_eg/reports"
##
## $raw_room
## raw_room
## "ipayipi_vignettes/rainfall_eg/raw_room"
##
## $work_dir
## [1] "ipayipi_vignettes/rainfall_eg"
What has ipip_house() done? It has created the following
directories, if they don’t already exist*:
*NB! Running this function will not overwrite existing data.
In this step, incoming data gets pulled from pipelines data source,
that is, the ‘source room’ (pipe_house$source_room), into
the ‘waiting room’ (pipe_house$wait_room). The example data
contains multiple years of hobo-rainfall data exports from SAEON rainfall stations in northern
Maputaland, South Africa.
# copy data from source to the wait_room
logger_data_import_batch(pipe_house = pipe_house)
Now that some data is in the ‘wait_room’ directory we can read it
into R. Note the ‘data_setup’ option for Hobo-data logger file exports
ipayipi::hobo_rain. Also the
record_interval_type has been set to ‘event_based’; options
include ‘mixed’ and ‘continuous’. Use ‘event_based’ for hobo data
(including cases where it is ‘mixed’ to cater for changes in future
outputs). More on this below.
imbibe_raw_batch(pipe_house = pipe_house,
data_setup = ipayipi::hobo_rain, # standard for reading hobo file exports
record_interval_type = "event_based"
)
For additional insight into data-input formats, that is, the
‘data_setup’ argument, see the help files of the
imbibe_raw_logger_dt() function (i.e.,
?imbibe_raw_logger_dt). The
ipayipi::ipayipi::hobo_rain caters for three variations in
format associated with Hoboware exports. These three variations allow
the import of hundreds of differently formatted (unencrypted) Hobo data
file exports, and more variations can be added.
Record-interval type is an important parameter. ipayipi
handles continuous, event-based
(discontinuous), and mixed time-series data types.
Record intervals get evaluated using the
record_interval_eval() function. Record interval
information will be important for further steps, such as, identifying
‘gaps’ or missing data automatically. More on this below.
Both file-header information, plus other phenomena (variable) metadata, will now be standardised. The spelling/synonyms of file names and associated header metadata have to be scrutinised first. Only after header information gets standardised can we move onto the phenomena.
# standardise header nomenclature
header_sts(pipe_house)
If it is the first-time running header_sts(), or new
synonyms get introduced into pipe-house directory,
header_sts() will produce a warning. This normally happens
when a station name is adjusted a loggr program, or a new station is
introduced into the pipeline, the pipeline then needs to define the
new-nomenclature standards.
Unstandardised names (or columns) have the preffix ‘uz’. These standards get stored in a file called ‘nomtab.rns’ in the ‘waiting room’. If this file is deleted—a new one will be generated—but the user will have to populate the tables with synonym vocab.
The nomenclature table in the ‘waiting room’ can be updated from the csv file (or directly in R). If a new synonym gets introduced—the file containing new nomenclature will be skipped in further processing—a ‘csv’ version of the ‘nomtab.rns’ will be copied to the ‘waiting room’ for editing.
Only the following fields — with NAs —
require editing in the ‘nomtab’ ‘csv’:
record_interval_eval(). However, if there are in sufficient
data records, e.g., one record in a continuous data flow, this needs
manual checking. For hobo data this must be set to ‘event_based’ (even
if the data is mixed).| uz_station | location | station | stnd_title | logger_type | logger_title | uz_record_interval_type | uz_record_interval | record_interval_type | record_interval | uz_table_name | table_name |
|---|---|---|---|---|---|---|---|---|---|---|---|
| Plot Title: Coastal Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | hobo_pendant | NA | event_based | discnt | event_based | discnt | hobo_data | raw_rain |
| Plot Title: Mseleni TC | mcp | mabasa_tc_rn | mcp_mabasa_tc_rn | hobo_pendant | NA | event_based | discnt | event_based | discnt | hobo_data | raw_rain |
| Manz_Office | mcp | manz_office_rn | mcp_manz_office_rn | hobo_pendant | NA | event_based | discnt | event_based | discnt | hobo_data | raw_rain |
| Plot Title: Sileza | mcp | sileza_camp_rn | mcp_sileza_camp_rn | hobo_pendant | NA | event_based | discnt | event_based | discnt | hobo_data | raw_rain |
| Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | hobo_pendant | NA | event_based | discnt | event_based | discnt | hobo_data | raw_rain |
| Plot_Title_Coastal_cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | hobo_pendant | NA | event_based | discnt | event_based | discnt | hobo_data | raw_rain |
| Plot_Title_Manz_Office | mcp | manz_office_rn | mcp_manz_office_rn | hobo_pendant | NA | event_based | discnt | event_based | discnt | hobo_data | raw_rain |
| Plot_Title_Mseleni_TC | mcp | manz_office_rn | mcp_manz_office_rn | hobo_pendant | NA | event_based | discnt | event_based | discnt | hobo_data | raw_rain |
| Plot_Title_Sileza_Camp | mcp | sileza_camp_rn | mcp_sileza_camp_rn | hobo_pendant | NA | event_based | discnt | event_based | discnt | hobo_data | raw_rain |
| Plot_Title_Sileza_camp | mcp | sileza_camp_rn | mcp_sileza_camp_rn | hobo_pendant | NA | event_based | discnt | event_based | discnt | hobo_data | raw_rain |
| Plot_title_Manz_Office | mcp | manz_office_rn | mcp_manz_office_rn | hobo_pendant | NA | event_based | discnt | event_based | discnt | hobo_data | raw_rain |
| Plot_title_Mseleni_TC | mcp | mabasa_tc_rn | mcp_mabasa_tc_rn | hobo_pendant | NA | event_based | discnt | event_based | discnt | hobo_data | raw_rain |
| Sileza_Camp | mcp | sileza_camp_rn | mcp_sileza_camp_rn | hobo_pendant | NA | event_based | discnt | event_based | discnt | hobo_data | raw_rain |
Scrolling up and down the table above we see that we are dealing with four different rain gauges or ‘stations’. However, in the raw data there are multiple synonyms for each station’s standard title. The supplied ‘table name’, visible scrolling to the right, will be the name of the table for associated raw data in an ‘ipayipi’ station file; the standard title field will form the file name of the station. Standardising these fields as in the table above is important for further processing of the data.
Once NA values of the above fields have been populated
the edited ‘csv’ will be imbibed into the pipeline structure when
rerunning header_sts(pipe_house)—this function will imbibe
the most recently updated ‘csv’ nomenclature table from the ‘wait_room’
into the pipeline, and standardised header nomenclature.
Tip: Double-check your nomenclature standards in the
‘csv’ file before running header_sts() again!
In step with good tidy data standards, keep nomenclature to ‘snake case’ with no special characters (bar the useful underscore’).
Standardising phenomena metadata follows a similar process as for header-data standardisation. If the phenomena standards have been described and there is a ‘phentab.rps’ in the ‘waiting room’, running the below code updates all files phenomena details.
# standardise phenomena
phenomena_sts(pipe_house = pipe_house)
Tip: Setting the remove_dups argument
to TRUE in phenomena_sts() allows the user to
interactively remove, through a prompt interface, duplicate phenomena,
on the off change logger exports have included these. Any interactive
(via the prompt line) functionality is only possibly if parallel
processing is sequential. See the section on parallel processing at the
end.
If there is no ‘phenomena table’ (‘phentab.rps’), one NA
values in the ‘csv’ copy need to be described. The following fields in
the ‘csv’ phentab must be populated:
Additional fields that are not mandatory include:
If an ‘f_convert’ factor (scroll right on the table below) is applied to phenomena, the standardised units must be different from the unstandardised units (uz_units) in the phenomena table. This ensures that phenomena that are appended have similar units.
| phen_name_full | phen_type | phen_name | units | measure | offset | var_type | uz_phen_name | uz_units | uz_measure | f_convert | sensor_id | notes |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Interference event: coupler attached | Interference event | attached | event | event | NA | fac | Coupler Attached | no_spec | no_spec | NA | NA | NA |
| Interference event: coupler attached | Interference event | attached | event | event | NA | fac | Coupler Attached (LGR S/N: 20102559) | no_spec | no_spec | NA | NA | NA |
| Interference event: bad battery | Interference event | bad_battery | event | event | NA | fac | Bad Battery (LGR S/N: 20116793) | no_spec | no_spec | NA | NA | NA |
| Interference event: host connected | Interference event | connected | event | event | NA | fac | Host Connected | no_spec | no_spec | NA | NA | NA |
| Interference event: host connected | Interference event | connected | event | event | NA | fac | Host Connected (LGR S/N: 20102559) | no_spec | no_spec | NA | NA | NA |
| Interference event: coupler detached | Interference event | detached | event | event | NA | fac | Coupler Detached | no_spec | no_spec | NA | NA | NA |
| Interference event: end of file | Interference event | end_of_file | event | event | NA | fac | End Of File | no_spec | no_spec | NA | NA | NA |
| Precipitation: total rainfall | Precipitation | rain_cumm | mm | cumm | 0 | num | Event, mm (LGR S/N: 20112169, SEN S/N: 20112169) | no_spec | no_spec | NA | NA | NA |
| Precipitation: total rainfall | Precipitation | rain_cumm | mm | cumm | 0 | num | Rainfall (mm) #20102559 | no_spec | no_spec | NA | NA | NA |
| Interference event: stopped | Interference event | stopped | event | event | NA | fac | Stopped (LGR S/N: 20102559) | no_spec | no_spec | NA | NA | NA |
After filling in details, to replace NA values (only in
columns highlighted blue!), rerun
phenomena_sts(pipe_house), to imbibe the updated phenomena
descriptions, and update the logger data.
Standardised data files get transferred to the ‘nomenclature vetted’ directory (‘nomtab room’) using the function below. After being transferred, files in the waiting room (except the nomtab and phentab standards) are automatically removed.
Note in the ‘waiting room’ how the file extension of files change as they are standardised. Successfully imbibed files have a ‘ipr’ extension. Post header standardisation the extension changes to ‘iph’; the extension becomes ‘ipi’ when phenomena synonyms are corrected.
# move standardised files to a storage directory
transfer_sts_files(pipe_house)
Archiving raw data files: Before removing raw
unstandardised files—if there is a ‘raw_room’ directory in the pipeline
working directory—raw input data files will be copied to this directory
and filed in folders by year and month of the lasted date of recording.
This is done by the imbibe_raw_batch() function.
Notes on the phenomena standards: Apart from the usual ‘rain_cumm’ phenomena where the cumulative total of rainfall ‘tips’ are summed, hobo logger systems document ‘interference’ events. Interference events denote when loggers were ‘interacted’ with in the field, e.g., ‘host connected’ or ‘attached’ for when a download cable is attached to the logger. This information will be useful for processing the data later on.
The append_station_batch() function updates station
files in the ‘ipip_room’ with files from the ‘nomvet_room’.
append_station_batch() optimises ‘non-NA’ data retention
between overlapping ‘date-time’ stamps. An important consideration for
Hobo-data logger exports that can differ depending on the computing
system environment.
## |============== Updating 0 stations with 0 standarised files ... ==============|
## |============================= stations appended ============================|
Now that station files have been generated lets examine what has been done so far. Station files are maintained in the ‘ipip_room’ of the pipeline’s folder structure.
# list station files in the ipip directory
sflist <- dta_list(
input_dir = pipe_house$ipip_room, # search directory
file_ext = ".ipip", # note the station's default file extension
)
# check what stations are in the ipip room
print(sflist)
## [1] "mcp_coastal_cashews_rn.ipip" "mcp_mabasa_tc_rn.ipip" "mcp_manz_office_rn.ipip" "mcp_sileza_camp_rn.ipip"
# read in the station file
sf <- readRDS(file.path(pipe_house$ipip_room, sflist[1]))
# names of the tables stored in the station file
names(sf)
## [1] "data_summary" "dt_1_days_agg" "dt_1_days_agg_saws" "dt_1_hours_agg" "dt_1_months_agg" "dt_1_years_agg" "dt_5_mins_agg" "dt_pseudo_event" "dt_rain" "dt_rain_se"
## [11] "f_params" "gaps" "logg_interfere" "meta_events" "phen_data_summary" "phens" "phens_dt" "pipe_seq" "pseudo_events" "raw_rain"
Our station file has one ‘raw’ data table, the ‘raw_rain’ table, composed of the event data below.
# a look at the first few data rows
head(sf$raw_rain)
## id date_time attached connected detached end_of_file rain_cumm stopped
## <num> <POSc> <fctr> <fctr> <fctr> <fctr> <num> <fctr>
## 1: 1 2019-01-14 16:17:19 0.000
## 2: 2 2019-01-14 16:18:30 Logged NA
## 3: 3 2019-01-16 06:23:18 0.254
## 4: 4 2019-01-16 06:28:15 0.508
## 5: 5 2019-01-16 06:36:08 0.762
## 6: 6 2019-01-16 11:24:44 1.016
| dsid | file_format | uz_station | location | station | stnd_title | start_dttm | end_dttm | logger_type | logger_title | logger_sn | logger_os | logger_program_name | logger_program_sig | uz_record_interval_type | uz_record_interval | record_interval_type | record_interval | dttm_inc_exc | dttm_ie_chng | uz_table_name | table_name | nomvet_name | file_origin |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 1 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2019-01-14 16:17:19 | 2019-03-13 14:07:53 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20190114-20190313__1.ipi | NA |
| 2 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2019-01-14 16:17:19 | 2019-04-11 13:50:38 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20190114-20190411__1.ipi | NA |
| 3 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2019-01-14 16:17:19 | 2019-05-15 09:27:29 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20190114-20190515__1.ipi | NA |
| 4 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2019-01-14 16:17:19 | 2019-07-09 09:03:39 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20190114-20190709__1.ipi | NA |
| 5 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2019-07-09 10:05:04 | 2019-08-13 15:21:31 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20190709-20190813__1.ipi | NA |
| 6 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2019-08-13 15:33:12 | 2019-09-11 11:43:00 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20190813-20190911__1.ipi | NA |
| 7 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2019-08-13 15:33:12 | 2019-10-09 08:53:19 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20190813-20191009__1.ipi | NA |
| 8 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2019-08-13 15:33:12 | 2019-11-13 09:23:40 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20190813-20191113__1.ipi | NA |
| 9 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2019-08-13 15:33:12 | 2019-12-12 09:57:13 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20190813-20191212__1.ipi | NA |
| 10 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2020-01-15 10:29:41 | 2020-02-14 12:21:12 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20200115-20200214__1.ipi | NA |
| 11 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2020-01-15 10:29:41 | 2020-03-09 15:58:07 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20200115-20200309__1.ipi | NA |
| 12 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2020-03-09 16:59:58 | 2020-07-09 10:04:38 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20200309-20200709__1.ipi | NA |
| 13 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2020-03-09 16:59:58 | 2020-08-14 13:33:18 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20200309-20200814__1.ipi | NA |
| 14 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2020-03-09 16:59:58 | 2020-09-11 13:30:58 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20200309-20200911__1.ipi | NA |
| 15 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2020-03-09 16:59:58 | 2020-10-15 10:55:22 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20200309-20201015__1.ipi | NA |
| 16 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2020-03-09 16:59:58 | 2020-11-19 13:37:47 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20200309-20201119__1.ipi | NA |
| 17 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2020-03-09 16:59:58 | 2020-12-09 14:23:29 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20200309-20201209__1.ipi | NA |
| 18 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2020-03-09 16:59:58 | 2021-02-11 10:23:06 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20200309-20210211__1.ipi | NA |
| 19 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2021-02-11 11:12:10 | 2021-03-09 11:06:52 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20210211-20210309__1.ipi | NA |
| 20 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2021-02-11 11:12:10 | 2021-04-13 12:36:46 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20210211-20210413__1.ipi | NA |
| 21 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2021-02-11 11:12:10 | 2021-05-14 15:30:17 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20210211-20210514__1.ipi | NA |
| 22 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2021-02-11 11:12:10 | 2021-06-17 10:07:51 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20210211-20210617__1.ipi | NA |
| 23 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2021-02-11 11:12:10 | 2021-07-20 11:39:31 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20210211-20210720__1.ipi | NA |
| 24 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2021-02-11 11:12:10 | 2021-08-12 09:26:17 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20210211-20210812__1.ipi | NA |
| 25 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2021-02-11 11:12:10 | 2021-09-15 15:27:21 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20210211-20210915__1.ipi | NA |
| 26 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2021-02-11 11:12:10 | 2021-10-14 11:13:36 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20210211-20211014__1.ipi | NA |
| 27 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2021-11-11 11:45:32 | 2021-12-14 14:47:35 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20211111-20211214__1.ipi | NA |
| 28 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2021-11-11 11:45:32 | 2022-01-13 13:18:25 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20211111-20220113__1.ipi | NA |
| 29 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2021-11-11 11:45:32 | 2022-02-10 07:54:55 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20211111-20220210__1.ipi | NA |
| 30 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2021-11-11 11:45:32 | 2022-03-10 17:19:55 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20211111-20220310__1.ipi | NA |
| 31 | hobo_csv_export | Plot_Title_Coastal_cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2021-11-11 11:45:32 | 2022-05-17 14:40:56 | hobo_pendant | NA | hobo_data | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20211111-20220517__1.ipi | NA |
| 32 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2022-05-17 14:56:22 | 2022-07-13 10:43:59 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20220517-20220713__1.ipi | NA |
| 33 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2022-05-17 14:56:22 | 2022-08-10 12:37:41 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20220517-20220810__1.ipi | NA |
| 34 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2022-05-17 14:56:22 | 2022-09-15 17:37:15 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20220517-20220915__1.ipi | NA |
| 35 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2022-05-17 14:56:22 | 2022-10-13 14:25:31 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20220517-20221013__1.ipi | NA |
| 36 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2022-05-17 14:56:22 | 2022-11-07 13:50:14 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20220517-20221107__1.ipi | NA |
| 37 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2022-05-17 14:56:22 | 2022-12-13 15:41:24 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20220517-20221213__1.ipi | NA |
| 38 | hobo_csv_export | Plot_Title_Coastal_Cashews | mcp | coastal_cashews_rn | mcp_coastal_cashews_rn | 2022-12-13 16:29:28 | 2023-01-11 17:03:29 | hobo_pendant | NA | 20116788 | NA | NA | NA | event_based | discnt | event_based | discnt | TRUE | FALSE | hobo_data | raw_rain | mcp_coastal_cashews_rn_discnt_20221213-20230111__1.ipi | NA |
| phid | phen_name_full | phen_type | phen_name | units | measure | offset | var_type | uz_phen_name | uz_units | uz_measure | f_convert | sensor_id | notes | table_name |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 1 | Interference event: coupler attached | Interference event | attached | event | event | NA | fac | Coupler Attached (LGR S/N: 20116788) | no_spec | no_spec | NA | NA | NA | raw_rain |
| 2 | Interference event: coupler detached | Interference event | detached | event | event | NA | fac | Coupler Detached (LGR S/N: 20116788) | no_spec | no_spec | NA | NA | NA | raw_rain |
| 3 | Interference event: end of file | Interference event | end_of_file | event | event | NA | fac | End Of File (LGR S/N: 20116788) | no_spec | no_spec | NA | NA | NA | raw_rain |
| 4 | Interference event: host connected | Interference event | connected | event | event | NA | fac | Host Connected (LGR S/N: 20116788) | no_spec | no_spec | NA | NA | NA | raw_rain |
| 11 | Interference event: stopped | Interference event | stopped | event | event | NA | fac | Stopped (LGR S/N: 20116788) | no_spec | no_spec | NA | NA | NA | raw_rain |
| 5 | Precipitation: total rainfall | Precipitation | rain_cumm | mm | cumm | 0 | num | Rainfall, mm (LGR S/N: 20116788, SEN S/N: 20116788) | no_spec | no_spec | NA | NA | NA | raw_rain |
| phid | phen_name | start_dttm | end_dttm | table_name |
|---|---|---|---|---|
| 1 | attached | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 | raw_rain |
| 4 | connected | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 | raw_rain |
| 2 | detached | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 | raw_rain |
| 3 | end_of_file | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 | raw_rain |
| 5 | rain_cumm | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 | raw_rain |
| 11 | stopped | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 | raw_rain |
Phenomena append notes: When appending phenomena tables, if there is overlapping data, ‘ipayipi’ will examine each overlapping phenomena series in turn and overwrite either new data (e.g., additions to a station file) or the station file data. The phenomena data summary keeps a temporal record of how phenomena have been appended. Maintaining these records helps data processing down the pipeline.
Handling big data: In order to minimise memory usage when processing station files they are ‘decompressed’ into temporary station files that by default last the duration of an R session. The temporary files are indexed/chunked/read by date-time. While this reduces memory allocation for further processing there is an initial time cost to chunk data when starting to use a station in an R session using this default setting.
Event metadata collected in the field, involving checks on the physical rain gauges, is important to integrate into the data pipeline. Below we import event metadata from a database of field records and save in the pipeline’s data processing directory.
## update station metadata ----
rl <- googlesheets4::read_sheet(
ss = "1b-dorAHYT21xa8vWS8tPu7M9NIkc9lrt2yYPVEnxX_o",
sheet = "mcp_event_data_generic",
range = "A1:Q341",
col_names = TRUE,
col_types = c("ilTccccdddcllcTTc"),
na = ""
)
names(rl)[3] <- "date_time"
meta_read(meta_file = rl, col_types = "ilTccccdddcllcTTc",
output_dir = pipe_house$ipip_room, output_name = "event_db"
)
Event metadata then gets pushed to appropriate stations using the function below.
meta_to_station(pipe_house = pipe_house, meta_file = "event_db")
Checking for data ‘gaps’ in continuous data streams is straight
forward—just highlight the missing/NA values. But with discontinuous or
event-based data it is more nuanced. gap_eval_batch()
identifies gap periods, specifically where a logger was not recording,
in continuous and discontinuous time-series data. For discontinuous data
streams, if the duration between the logger-host connection (download in
the field) and next start of logging (derived from the interference
events above), exceeds a threshold, a gap is present. The default
threshold is 10 minutes — see the table below.
gap_eval_batch(pipe_house)
sf <- readRDS(file.path(pipe_house$ipip_room, sflist[4]))[["gaps"]]
print(sf)
## gid eid gap_type phen table_name gap_start gap_end dt_diff_s gap_problem_thresh_s problem_gap notes
## <int> <lgcl> <char> <char> <char> <POSc> <POSc> <difftime> <num> <lgcl> <char>
## 1: 1 NA auto logger raw_rain 2021-03-30 14:21:10 2021-03-31 11:23:15 75725 secs 21600 TRUE <NA>
## 2: 2 NA auto logger raw_rain 2023-06-09 09:59:03 2023-08-10 09:02:45 5353422 secs 21600 TRUE <NA>
## 3: 3 NA auto logger raw_rain 2024-04-17 08:32:36 2024-05-07 10:50:46 1736290 secs 21600 TRUE <NA>
The graph below shows only the Sileza rainfall station with a data gap that exceeded the 10 minute threshold. Hover over the graph—interact.
p <- dta_availability(pipe_house = pipe_house, plot_tbls = "raw_rain")
ggplotly(p$plt, tooltip = "text")
Gaps highlighted in the dark-red colour.
The gap_eval_batch() function has detailed help files on
how to structure event metadata. This event metadata gets used when
‘gaps’ are defined. These gaps will need to be imputed with appropriate
data in further processing steps not covered here.
Next data is processed using a custom ‘pipe sequence’. The ‘pipe
sequence’ details processing operations stage by stage and step by step.
The pipe sequence object pipe_seq is read into the R
environment by sourcing its script as below. This script serves as a
reference for processing data. More on the pipe sequence at the end of
this document.
source('ipayipi_vignettes/rainfall_eg/r/pipe_seq/txs_tb.r')
dt_process_batch(pipe_house = pipe_house, pipe_seq = pipe_seq)
Note the ‘dt_’ preffix of processed data tables, and the embedded
pipe_seq table in the updated station file below. There is
also a table of ‘pseudo events’ or false tips.
# read in the station file
sf <- readRDS(file.path(pipe_house$ipip_room, sflist[1]))
# names of the tables stored in the station file
names(sf)
## [1] "data_summary" "dt_1_days_agg" "dt_1_days_agg_saws" "dt_1_hours_agg" "dt_1_months_agg" "dt_1_years_agg" "dt_5_mins_agg" "dt_pseudo_event" "dt_rain" "dt_rain_se"
## [11] "f_params" "gaps" "logg_interfere" "meta_events" "phen_data_summary" "phens" "phens_dt" "pipe_seq" "pseudo_events" "raw_rain"
Visualising the cumulative sum of phenomena is useful for examining rate of change and correlation between datasets.
Station data can be queried across or within pipelines using
dta_pull_flat(). Data can be saved to file in ‘csv’ format
— see below.
# wide format
# when writing to csv the default output directory is pipe_house$dta_out
dta_flat_pull(pipe_house = pipe_house,
phen_name = "rain_tot", # name of the phenomena --- exact match
tab_names = "dt_1_years_agg", # table within which to search for phen
out_csv = TRUE,
out_csv_preffix = "mcp"
)
## $dta
## date_time mcp_coastal_cashews_rn mcp_mabasa_tc_rn mcp_manz_office_rn mcp_sileza_camp_rn
## <POSc> <num> <num> <num> <num>
## 1: 2018-01-01 NA NA 672.338 69.088
## 2: 2019-01-01 584.200 NA 976.376 584.454
## 3: 2020-01-01 606.298 NA 1700.276 523.494
## 4: 2021-01-01 768.604 237.490 2021.840 724.916
## 5: 2022-01-01 834.644 635.762 2328.672 654.558
## 6: 2023-01-01 0.508 NA 2203.704 704.850
## 7: 2024-01-01 NA NA 1314.704 246.380
##
## $time_interval
## dt_1_years_agg
## "1_years"
##
## $stations
## [1] "mcp_coastal_cashews_rn.ipip" "mcp_mabasa_tc_rn.ipip" "mcp_manz_office_rn.ipip" "mcp_sileza_camp_rn.ipip"
##
## $tab_names
## [1] "dt_1_years_agg"
##
## $phen_name
## [1] "rain_tot"
##
## $file_ext
## [1] ".ipip"
# long format
d <- dta_flat_pull(pipe_house = pipe_house, phen_name = "rain_tot",
tab_names = "dt_1_years_agg", wide = FALSE
)
d$dta
## date_time stnd_title rain_tot
## <POSc> <fctr> <num>
## 1: 2018-01-01 mcp_coastal_cashews_rn NA
## 2: 2019-01-01 mcp_coastal_cashews_rn 584.200
## 3: 2020-01-01 mcp_coastal_cashews_rn 606.298
## 4: 2021-01-01 mcp_coastal_cashews_rn 768.604
## 5: 2022-01-01 mcp_coastal_cashews_rn 834.644
## 6: 2023-01-01 mcp_coastal_cashews_rn 0.508
## 7: 2024-01-01 mcp_coastal_cashews_rn NA
## 8: 2018-01-01 mcp_mabasa_tc_rn NA
## 9: 2019-01-01 mcp_mabasa_tc_rn NA
## 10: 2020-01-01 mcp_mabasa_tc_rn NA
## 11: 2021-01-01 mcp_mabasa_tc_rn 237.490
## 12: 2022-01-01 mcp_mabasa_tc_rn 635.762
## 13: 2023-01-01 mcp_mabasa_tc_rn NA
## 14: 2024-01-01 mcp_mabasa_tc_rn NA
## 15: 2018-01-01 mcp_manz_office_rn 672.338
## 16: 2019-01-01 mcp_manz_office_rn 976.376
## 17: 2020-01-01 mcp_manz_office_rn 1700.276
## 18: 2021-01-01 mcp_manz_office_rn 2021.840
## 19: 2022-01-01 mcp_manz_office_rn 2328.672
## 20: 2023-01-01 mcp_manz_office_rn 2203.704
## 21: 2024-01-01 mcp_manz_office_rn 1314.704
## 22: 2018-01-01 mcp_sileza_camp_rn 69.088
## 23: 2019-01-01 mcp_sileza_camp_rn 584.454
## 24: 2020-01-01 mcp_sileza_camp_rn 523.494
## 25: 2021-01-01 mcp_sileza_camp_rn 724.916
## 26: 2022-01-01 mcp_sileza_camp_rn 654.558
## 27: 2023-01-01 mcp_sileza_camp_rn 704.850
## 28: 2024-01-01 mcp_sileza_camp_rn 246.380
## date_time stnd_title rain_tot
p <- plot_m_anomaly(pipe_house = pipe_house, phen_name = "rain_tot",
phen_units = "mm"
)
p$plt[[3]]
p$plt[[4]]
## Warning: Removed 8 rows containing missing values or values outside the scale range (`geom_segment()`).
It’s not necessary to understand the detail of each function that
builds the pipe_seq object below. However, for cusomising
these functions, more info can be found in each function’s helpfile
descriptions, e.g., type ?pipe_seq in your R console. The
pipe_seq table from the station file processed above is
shown below. Note function parameters (‘f_params’) are described for
each stage (‘dt_n’) and nested steps (‘dtp_n’). The function for each
stage’s step are in the ‘f’ column. This pipe sequence table makes it
easier to inspect the data processing pipeline. When this table was
evaluated by dt_drocess_batch() further parameters were
produced for each respective function and stored in the station file
objects.
# using kableExtra
kbl(sf$pipe_seq) |> kable_paper("hover") |> kable_styling(font_size = 11) |>
scroll_box(width = "100%", height = "180px")
| dt_n | dtp_n | n | f | f_params | input_dt | output_dt | time_interval | start_dttm | end_dttm |
|---|---|---|---|---|---|---|---|---|---|
| 1 | 1 | 1 | dt_harvest | hsf_param_eval(hsf_table = “meta_events”) | meta_events | dt_pseudo_event | discnt | NA | NA |
| 1 | 2 | 1 | dt_calc | dt[event_type == ‘pseudo_events’, ] | dt_pseudo_event | dt_pseudo_event | discnt | NA | NA |
| 1 | 2 | 2 | dt_calc | [stnd_title == station, ] | dt_pseudo_event | dt_pseudo_event | discnt | NA | NA |
| 1 | 2 | 3 | dt_calc | [qa == TRUE, ] | dt_pseudo_event | dt_pseudo_event | discnt | NA | NA |
| 2 | 1 | 1 | dt_harvest | hsf_param_eval(hsf_table = “raw_rain”) | raw_rain | dt_rain_se | discnt | NA | NA |
| 2 | 2 | 1 | dt_calc | dt[, fktest := fifelse(.I == 1 | .I == .N, TRUE, FALSE)] | dt_rain_se | dt_rain_se | discnt | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 |
| 2 | 2 | 2 | dt_calc | [fktest == TRUE, ] | dt_rain_se | dt_rain_se | discnt | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 |
| 2 | 2 | 3 | dt_calc | [, rain_mm := rain_cumm, ipip(measure = “tot”, units = “mm”, var_type = “num”)] | dt_rain_se | dt_rain_se | discnt | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 |
| 2 | 2 | 4 | dt_calc | [, rain_mm := 0] | dt_rain_se | dt_rain_se | discnt | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 |
| 2 | 2 | 5 | dt_calc | [, .(date_time, rain_mm)] | dt_rain_se | dt_rain_se | discnt | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 |
| 3 | 1 | 1 | dt_harvest | hsf_param_eval(hsf_table = “dt_rain_se”) | dt_rain_se | dt_rain_se | discnt | NA | NA |
| 3 | 2 | 1 | dt_calc | dt[, fktest := fifelse(.I == 1 | .I == .N, TRUE, FALSE)] | dt_rain_se | dt_rain_se | discnt | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 |
| 3 | 2 | 2 | dt_calc | [fktest == TRUE, ] | dt_rain_se | dt_rain_se | discnt | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 |
| 3 | 2 | 3 | dt_calc | [, rain_mm := 0] | dt_rain_se | dt_rain_se | discnt | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 |
| 3 | 2 | 4 | dt_calc | [, .(date_time, rain_mm)] | dt_rain_se | dt_rain_se | discnt | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 |
| 4 | 1 | 1 | dt_harvest | hsf_param_eval(hsf_table = “raw_rain”) | raw_rain | dt_rain | discnt | NA | NA |
| 4 | 2 | 1 | dt_calc | dt[!is.na(rain_cumm), ] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 2 | 2 | dt_calc | [, rain_diff := c(0, rain_cumm[2:.N] - rain_cumm[1:(.N - 1)]), ipip(measure = “tot”, units = “mm”, var_type = “num”)] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 2 | 3 | dt_calc | [rain_diff != 0, ] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 2 | 4 | dt_calc | [, t_lag := c(0, date_time[2:.N] - date_time[1:(.N - 1)]), ipip(measure = “tot”, units = “sec”, var_type = “num”)] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 3 | 1 | dt_harvest | hsf_param_eval(hsf_table = “logg_interfere”) | logg_interfere | dt_rain | discnt | NA | NA |
| 4 | 4 | 1 | dt_join | join_param_eval(fuzzy = c(0, 600), join = “left_join”) | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 5 | 1 | dt_harvest | hsf_param_eval(hsf_table = “dt_pseudo_event”) | dt_pseudo_event | dt_rain | discnt | NA | NA |
| 4 | 6 | 1 | dt_join | join_param_eval(fuzzy = 0, join = “left_join”, y_key = c(“start_dttm”, “end_dttm”)) | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 7 | 1 | dt_calc | dt[, false_tip_type := fifelse(logg_interfere_type %in% ‘on_site’, ‘interfere’, NA_character_), ipip(measure = “smp”, units = “false_tip”, var_type = “chr”)] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 7 | 2 | dt_calc | [, false_tip_type := fifelse(is.na(false_tip_type) & event_type %in% ‘pseudo_events’,‘pseudo_event’, false_tip_type)] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 7 | 3 | dt_calc | [, false_tip_type := fifelse(t_lag == 1, ‘double_tip’, false_tip_type)] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 7 | 4 | dt_calc | [, false_tip := fifelse(!is.na(false_tip_type), TRUE, FALSE), ipip(measure = “smp”, units = “false_tip”, var_type = “logi”)] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 7 | 5 | dt_calc | [, false_tip := fifelse(problem_gap %in% FALSE, FALSE, false_tip)] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 7 | 6 | dt_calc | [, false_tip := fifelse(is.na(false_tip), FALSE, false_tip)] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 7 | 7 | dt_calc | [, .(date_time, rain_cumm, rain_diff, false_tip, false_tip_type, problem_gap)] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 7 | 8 | dt_calc | [false_tip == FALSE, .(date_time, false_tip, false_tip_type, problem_gap), ipip(fork_table = “pseudo_events”)] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 7 | 9 | dt_calc | [, .(date_time)] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 4 | 7 | 10 | dt_calc | [, rain_mm := 0.254, ipip(measure = “tot”, units = “mm”, var_type = “num”)] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 5 | 1 | 1 | dt_harvest | hsf_param_eval(hsf_table = “dt_rain”) | dt_rain | dt_rain | discnt | NA | NA |
| 5 | 2 | 1 | dt_calc | dt[!rain_mm == 0, ] | dt_rain | dt_rain | discnt | 2019-01-16 06:23:18 | 2023-01-08 13:43:52 |
| 5 | 3 | 1 | dt_harvest | hsf_param_eval(hsf_table = “dt_rain_se”) | dt_rain_se | dt_rain | discnt | NA | NA |
| 5 | 4 | 1 | dt_join | join_param_eval(join = “full_join”) | dt_rain | dt_rain | discnt | 2019-01-14 16:17:19 | 2023-01-11 17:03:29 |
| 6 | 1 | 1 | dt_harvest | hsf_param_eval(hsf_table = “gaps”) | gaps | dt_gaps_tmp | discnt | NA | NA |
| 6 | 2 | 1 | dt_calc | dt[problem_gap == TRUE, .(gap_start, gap_end, problem_gap)] | dt_gaps_tmp | dt_gaps_tmp | discnt | NA | NA |
| 7 | 1 | 1 | dt_harvest | hsf_param_eval(hsf_table = “dt_rain”) | dt_rain | dt_rain | discnt | NA | NA |
| 7 | 2 | 1 | dt_agg | agg_options(agg_intervals=c(“5_mins”,“1_hours”,“1_days”)) | dt_rain | dt_rain | discnt | NA | NA |
| 7 | 2 | 2 | dt_agg | agg_options(ignore_nas=TRUE) | dt_rain | dt_rain | discnt | NA | NA |
| 7 | 2 | 3 | dt_agg | agg_options(all_phens=FALSE) | dt_rain | dt_rain | discnt | NA | NA |
| 7 | 2 | 4 | dt_agg | aggs(rain_mm=.(phen_out_name=“rain_tot”,units=“mm”)) | dt_rain | dt_rain | discnt | NA | NA |
| 7 | 3 | 1 | dt_agg | agg_options(agg_intervals=“1_days”) | dt_rain | dt_rain | discnt | NA | NA |
| 7 | 3 | 2 | dt_agg | agg_options(agg_dt_suffix=“_agg_saws”) | dt_rain | dt_rain | discnt | NA | NA |
| 7 | 3 | 3 | dt_agg | agg_options(ignore_nas=TRUE) | dt_rain | dt_rain | discnt | NA | NA |
| 7 | 3 | 4 | dt_agg | agg_options(all_phens=FALSE) | dt_rain | dt_rain | discnt | NA | NA |
| 7 | 3 | 5 | dt_agg | agg_options(agg_offset=“8hours”) | dt_rain | dt_rain | discnt | NA | NA |
| 7 | 3 | 6 | dt_agg | aggs(rain_mm=.(phen_out_name=“rain_tot”,units=“mm”)) | dt_rain | dt_rain | discnt | NA | NA |
| 8 | 1 | 1 | dt_harvest | hsf_param_eval(hsf_table = “dt_1_days_agg”) | dt_1_days_agg | dt_discnt | discnt | NA | NA |
| 8 | 2 | 1 | dt_agg | agg_options(agg_intervals=c(“1_months”,“1_years”)) | dt_discnt | dt_discnt | discnt | NA | NA |
| 8 | 2 | 2 | dt_agg | agg_options(ignore_nas=TRUE) | dt_discnt | dt_discnt | discnt | NA | NA |
The rainfall-data pipeline was build using the following script. This pipe sequence can be adjusted and re-embedded into a station file at any time.
library(ipayipi)
#' Pipeline rainfall data processing sequence
#' v1.00
#' Last updated 2024-07-30
#'
#' Summary
#' The pipe sequence is documented as `pipe_seq` below. First calculation
#' parameters are defined as consequtively numbered 'x' variables. These
#' are used in the `pipe_seq` object.
#' This `pipe_seq` cleans/processes rainfall data from automatic tipping-
#' bucket rainguages. Data processed into the folling time
#' interval aggregations; "5 mins", "hourly", "daily", "monthly", and "yearly".
#' For compatibility with SAWS an eight hour offest is provided in addition to
#' the standard daily aggregation.
# define constants ----
#' general tipping bucket pipeline sequence for rainfall data
rain_tip <- 0.254 #' value in mm of a single tip-bucket tip
# calc parameters -> passed to `calc_param_eval()` ----
#' filter the events data based to extract 'pseudo events' (i.e., false tips)
#' Pseudo events
x12 <- list(
pseudo_ev1 = chainer(dt_syn_bc = "event_type == \'pseudo_events\'"),
pseudo_ev2 = chainer(dt_syn_bc = "stnd_title == station"),
pseudo_ev3 = chainer(dt_syn_bc = "qa == TRUE")
)
#' create a start and end date_time -- necessary for discontinuous data
#' this will be appended to the discontinuous data series before data
#' aggregation
x22 <- list(
fork_se1 = chainer(
dt_syn_ac = "fktest := fifelse(.I == 1 | .I == .N, TRUE, FALSE)"
),
fork_se2 = chainer(dt_syn_bc = "fktest == TRUE"),
fork_se3 = chainer(dt_syn_ac = "rain_mm := rain_cumm",
measure = "tot", units = "mm", var_type = "num"
),
fork_se4 = chainer(dt_syn_ac = "rain_mm := 0"),
fork_se5 = chainer(dt_syn_ac = ".(date_time, rain_mm)")
)
#' repeat the above after it was written to dt_rain_se in chunked data
x32 <- list(
fork_se1 = chainer(
dt_syn_ac = "fktest := fifelse(.I == 1 | .I == .N, TRUE, FALSE)"
),
fork_se2 = chainer(dt_syn_bc = "fktest == TRUE"),
fork_se4 = chainer(dt_syn_ac = "rain_mm := 0"),
fork_se5 = chainer(dt_syn_ac = ".(date_time, rain_mm)")
)
#' remove double tips (tips one second after the previous)
x42 <- list(
logg_remove1 = chainer(dt_syn_bc = "!is.na(rain_cumm)"),
rain_diff = chainer(
dt_syn_ac = "rain_diff := c(0, rain_cumm[2:.N] - rain_cumm[1:(.N - 1)])",
measure = "tot", units = "mm", var_type = "num"
),
rain_diff_remove = chainer(dt_syn_bc = "rain_diff != 0"),
t_lag = chainer(
dt_syn_ac = "t_lag := c(0, date_time[2:.N] - date_time[1:(.N - 1)])",
measure = "tot", units = "sec", var_type = "num"
)
)
#' summarise pseudo events--false tips
x47 <- list(
false_tip_type1 = chainer(dt_syn_ac = paste0("false_tip_type := fifelse(",
"logg_interfere_type %in% \'on_site\', \'interfere\', NA_character_)"
), temp_var = FALSE, measure = "smp", units = "false_tip", var_type = "chr"
),
false_tip_type2 = chainer(dt_syn_ac = paste0("false_tip_type := fifelse(",
"is.na(false_tip_type) & event_type %in% \'pseudo_events\',",
"\'pseudo_event\', false_tip_type)"
)),
false_tip_type3 = chainer(dt_syn_ac = paste0("false_tip_type := ",
"fifelse(t_lag == 1, \'double_tip\', false_tip_type)"
)),
false_tip4 = chainer(
dt_syn_ac = "false_tip := fifelse(!is.na(false_tip_type), TRUE, FALSE)",
temp_var = FALSE, measure = "smp", units = "false_tip", var_type = "logi"
),
false_tip5 = chainer(dt_syn_ac =
"false_tip := fifelse(problem_gap %in% FALSE, FALSE, false_tip)"
),
false_tip6 = chainer(
dt_syn_ac = "false_tip := fifelse(is.na(false_tip), FALSE, false_tip)"
),
clean_up1 = chainer(dt_syn_ac = paste0(".(date_time, rain_cumm, rain_diff, ",
"false_tip, false_tip_type, problem_gap)"
)),
false_tip_table = chainer(dt_syn_bc = "false_tip == FALSE",
dt_syn_ac = ".(date_time, false_tip, false_tip_type, problem_gap)",
fork_table = "pseudo_events"
),
clean_up2 = chainer(dt_syn_ac = paste0(".(date_time)")),
rain_mm = chainer(dt_syn_ac = paste0("rain_mm := ", rain_tip),
measure = "tot", units = "mm", var_type = "num"
)
)
#' remove old start end date-time values
x52 <- list(clean_se = chainer(dt_syn_bc = "!rain_mm == 0"))
#' filter gaps before joining --- only want 'problem gaps'
x62 <- list(pgap = chainer(dt_syn_bc = "problem_gap == TRUE",
dt_syn_ac = ".(gap_start, gap_end, problem_gap)"
))
# build pipe sequence ----
#' this builds the table from the described parameters that will be evaluated
#' to generate parameters for processing the data
pipe_seq <- pipe_seq(p = pdt(
# extract pseudo events
p_step(dt_n = 1, dtp_n = 1, f = "dt_harvest",
f_params = hsf_param_eval(hsf_table = "meta_events"),
output_dt = "dt_pseudo_event"
),
p_step(dt_n = 1, dtp_n = 2, f = "dt_calc",
f_params = calc_param_eval(x12),
output_dt = "dt_pseudo_event"
),
#' create 'fork_se'
#' the fork start and end date-time will be appended to data before
#' time interval aggregations---NB for event data
#' this feeds the aggregation function the full range of data to
#' be aggregated. Without this the leading and training time where events
#' aren't logged will be cut short from the aggregation
p_step(dt_n = 2, dtp_n = 1, f = "dt_harvest",
f_params = hsf_param_eval(hsf_table = "raw_rain"),
output_dt = "dt_rain_se"
),
p_step(dt_n = 2, dtp_n = 2, f = "dt_calc",
f_params = calc_param_eval(x22),
output_dt = "dt_rain_se"
),
# join 'fork_se' with old 'fork_se'
p_step(dt_n = 3, dtp_n = 1, f = "dt_harvest",
f_params = hsf_param_eval(hsf_table = "dt_rain_se"),
output_dt = "dt_rain_se"
),
p_step(dt_n = 3, dtp_n = 2, f = "dt_calc",
f_params = calc_param_eval(x32),
output_dt = "dt_rain_se"
),
# find 'double tips'
p_step(dt_n = 4, dtp_n = 1, f = "dt_harvest",
f_params = hsf_param_eval(hsf_table = "raw_rain"),
output_dt = "dt_rain"
),
p_step(dt_n = 4, dtp_n = 2, f = "dt_calc",
f_params = calc_param_eval(x42),
output_dt = "dt_rain"
),
# get interference events
p_step(dt_n = 4, dtp_n = 3, f = "dt_harvest",
f_params = hsf_param_eval(hsf_table = "logg_interfere"),
output_dt = "dt_rain"
),
p_step(dt_n = 4, dtp_n = 4, f = "dt_join",
f_params = join_param_eval(join = "left_join", fuzzy = c(0, 600))
),
# add pseudo events
p_step(dt_n = 4, dtp_n = 5, f = "dt_harvest",
f_params = hsf_param_eval(hsf_table = "dt_pseudo_event"),
output_dt = "dt_rain"
),
p_step(dt_n = 4, dtp_n = 6, f = "dt_join",
f_params = join_param_eval(join = "left_join", fuzzy = 0,
y_key = c("start_dttm", "end_dttm")
), output_dt = "dt_rain"
),
p_step(dt_n = 4, dtp_n = 7, f = "dt_calc",
f_params = calc_param_eval(x47),
output_dt = "dt_rain"
),
p_step(dt_n = 5, dtp_n = 1, f = "dt_harvest",
f_params = hsf_param_eval(hsf_table = "dt_rain"),
output_dt = "dt_rain"
),
p_step(dt_n = 5, dtp_n = 2, f = "dt_calc",
f_params = calc_param_eval(x52),
output_dt = "dt_rain"
),
p_step(dt_n = 5, dtp_n = 3, f = "dt_harvest",
f_params = hsf_param_eval(hsf_table = "dt_rain_se"),
output_dt = "dt_rain"
),
p_step(dt_n = 5, dtp_n = 4, f = "dt_join",
join_param_eval(), output_dt = "dt_rain"
),
p_step(dt_n = 6, dtp_n = 1, f = "dt_harvest",
hsf_param_eval(hsf_table = "gaps"),
output_dt = "dt_gaps_tmp"
),
p_step(dt_n = 6, dtp_n = 2, f = "dt_calc",
calc_param_eval(x62), output_dt = "dt_gaps_tmp"
),
# aggregate data
p_step(dt_n = 7, dtp_n = 1, f = "dt_harvest",
hsf_param_eval(hsf_table = "dt_rain"),
output_dt = "dt_rain"
),
p_step(dt_n = 7, dtp_n = 2, f = "dt_agg",
agg_param_eval(
agg_intervals = c("5 mins", "hourly", "daily"),
ignore_nas = TRUE, all_phens = FALSE,
agg_parameters = aggs(
rain_mm = agg_params(units = "mm", phen_out_name = "rain_tot")
)
)
),
# saws daily data aggregation
p_step(dt_n = 7, dtp_n = 3, f = "dt_agg",
agg_param_eval(
agg_offset = "8 hours",
agg_intervals = c("daily"),
ignore_nas = TRUE,
all_phens = FALSE,
agg_parameters = aggs(
rain_mm = agg_params(units = "mm", phen_out_name = "rain_tot")
),
agg_dt_suffix = "_agg_saws"
)
),
p_step(dt_n = 8, dtp_n = 1, f = "dt_harvest",
hsf_param_eval(hsf_table = "dt_1_days_agg")
),
p_step(dt_n = 8, dtp_n = 2, f = "dt_agg",
agg_param_eval(
agg_intervals = c("monthly", "yearly"),
ignore_nas = TRUE
)
)
))
When running the pipe sequence we choose what portion of it to run by
limiting the stages argument in
dt_process_batch()—we can’t run the latter stages though
without the former. This allows interruption of processing for imputing
data etc.
plan(multisession) on Windows or
plan(multicore) on Linux or MacOS. NB! Running in parallel
will disable any interactive prompts, and may limit function
message/error reporting. Something to be aware of. Also, running a
‘multisession’ is not always efficient. Test out various options as
speed will be influenced by load bearing and your local system. Running
on ‘multisession’ may produce errors that don’t occur when
plan(sequential, split = TRUE) is called.